import sys
import socket
import time
+import types
from twisted.internet import reactor
from twisted.internet import defer
#defer.Deferred.debug = 1
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import ClientFactory
+from twisted.python.failure import Failure
import sxp
import XendDB
sxp.show(req, out=self.transport)
def loseConnection(self):
- print 'Xfrd>loseConnection>'
self.transport.loseConnection()
def connectionLost(self, reason):
- print 'Xfrd>connectionLost>', reason
self.xinfo.connectionLost(reason)
def dataReceived(self, data):
self.xinfo = xinfo
def startedConnecting(self, connector):
- print 'Started to connect', 'self=', self, 'connector=', connector
+ pass
def buildProtocol(self, addr):
- print 'buildProtocol>', addr
return Xfrd(self.xinfo)
def clientConnectionLost(self, connector, reason):
- print 'clientConnectionLost>', 'connector=', connector, 'reason=', reason
+ pass
def clientConnectionFailed(self, connector, reason):
- print 'clientConnectionFailed>', 'connector=', connector, 'reason=', reason
self.xinfo.error(reason)
class XfrdInfo:
"""Suspend timeout (seconds).
We set a timeout because suspending a domain can hang."""
- timeout = 30
+ timeout = 10
def __init__(self):
from xen.xend import XendDomain
self.deferred = defer.Deferred()
self.suspended = {}
self.paused = {}
+ self.state = 'init'
+ # List of errors encountered.
+ self.errors = []
def vmconfig(self):
dominfo = self.xd.domain_get(self.src_dom)
val = None
return val
+ def add_error(self, err):
+ """Add an error to the error list.
+ Returns the error added (which may have been unwrapped if it
+ was a Twisted Failure).
+ """
+ while isinstance(err, Failure):
+ err = err.value
+ if err not in self.errors:
+ self.errors.append(err)
+ return err
+
+ def error_summary(self, msg=None):
+ """Get a XendError summarising the errors (if any).
+ """
+ if msg is None:
+ msg = "errors"
+ if self.errors:
+ errmsg = msg + ': ' + ', '.join(map(str, self.errors))
+ else:
+ errmsg = msg
+ return XendError(errmsg)
+
+ def get_errors(self):
+ """Get the list of errors.
+ """
+ return self.errors
+
def error(self, err):
- print 'Error>', err
self.state = 'error'
+ self.add_error(err)
if not self.deferred.called:
- print 'Error> calling errback'
- self.deferred.errback(err)
+ self.deferred.errback(self.error_summary())
def dispatch(self, xfrd, val):
cbok(val)
def unknown(self, xfrd, val):
- print 'unknown>', val
xfrd.loseConnection()
return None
def xfr_err(self, xfrd, val):
# If we get an error with non-zero code the operation failed.
# An error with code zero indicates hello success.
- print 'xfr_err>', val
v = sxp.child0(val)
- print 'xfr_err>', type(v), v
err = int(sxp.child0(val))
if not err: return
- self.error(err);
+ self.error("transfer daemon (xfrd) error: " + str(err))
xfrd.loseConnection()
return None
def xfr_progress(self, xfrd, val):
- print 'xfr_progress>', val
return None
def xfr_vm_destroy(self, xfrd, val):
- print 'xfr_vm_destroy>', val
try:
vmid = sxp.child0(val)
val = self.xd.domain_destroy(vmid)
del self.paused[vmid]
if vmid in self.suspended:
del self.suspended[vmid]
- except:
+ except StandardError, err:
+ self.add_error("vm_destroy failed")
+ self.add_error(err)
val = errno.EINVAL
return ['xfr.err', val]
def xfr_vm_pause(self, xfrd, val):
- print 'xfr_vm_pause>', val
try:
vmid = sxp.child0(val)
val = self.xd.domain_pause(vmid)
self.paused[vmid] = 1
- except:
+ except StandardError, err:
+ self.add_error("vm_pause failed")
+ self.add_error(err)
val = errno.EINVAL
return ['xfr.err', val]
def xfr_vm_unpause(self, xfrd, val):
- print 'xfr_vm_unpause>', val
try:
vmid = sxp.child0(val)
val = self.xd.domain_unpause(vmid)
if vmid in self.paused:
del self.paused[vmid]
- except:
+ except StandardError, err:
+ self.add_error("vm_unpause failed")
+ self.add_error(err)
val = errno.EINVAL
return ['xfr.err', val]
Suspending can hang, so we set a timeout and fail if it
takes too long.
"""
- print 'xfr_vm_suspend>', val
try:
vmid = sxp.child0(val)
d = defer.Deferred()
# the domain died. Set a timeout and error handler so the subscriptions
# will be cleaned up if suspending hangs or there is an error.
def onSuspended(e, v):
- print 'xfr_vm_suspend>onSuspended>', e, v
if v[1] != vmid: return
subscribe(on=0)
- d.callback(v)
+ if not d.called:
+ d.callback(v)
def onDied(e, v):
- print 'xfr_vm_suspend>onDied>', e, v
if v[1] != vmid: return
- d.errback(XendError('Domain died'))
+ if not d.called:
+ d.errback(XendError('Domain %s died while suspending' % vmid))
def subscribe(on=1):
if on:
action('xend.domain.died', onDied)
def cberr(err):
- print 'xfr_vm_suspend>cberr>', err
subscribe(on=0)
+ self.add_error("suspend failed")
+ self.add_error(err)
return err
+ d.addErrback(cberr)
+ d.setTimeout(self.timeout)
subscribe()
val = self.xd.domain_shutdown(vmid, reason='suspend')
self.suspended[vmid] = 1
- d.addErrback(cberr)
- d.setTimeout(self.timeout)
return d
except Exception, err:
- print 'xfr_vm_suspend> Exception', err
+ self.add_error("suspend failed")
+ self.add_error(err)
traceback.print_exc()
val = errno.EINVAL
return ['xfr.err', val]
def connectionLost(self, reason=None):
- print 'XfrdInfo>connectionLost>', reason
for vmid in self.suspended:
try:
self.xd.domain_destroy(vmid)
['id', self.xid ],
['state', self.state ],
['live', self.live ],
- ['resource', self.resource] ]
+ ['resource', self.resource ] ]
sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ]
sxpr.append(sxpr_src)
sxpr_dst = ['dst', ['host', self.dst_host] ]
def request(self, xfrd):
vmconfig = self.vmconfig()
if not vmconfig:
+ self.error(XendError("vm config not found"))
xfrd.loseConnection()
return
- log.info('Migrate BEGIN: ' + str(self.sxpr()))
+ log.info('Migrate BEGIN: %s' % str(self.sxpr()))
eserver.inject('xend.domain.migrate',
- [ self.dominfo.name, self.dominfo.id,
- "begin", self.sxpr() ])
+ [ self.dominfo.name, self.dominfo.id, "begin", self.sxpr() ])
xfrd.request(['xfr.migrate',
self.src_dom,
vmconfig,
self.live,
self.resource ])
-## def xfr_vm_suspend(self, xfrd, val):
-## def cbok(val):
-## # Special case for localhost: destroy devices early.
-## if self.dst_host in ["localhost", "127.0.0.1"]:
-## self.dominfo.restart_cancel()
-## self.dominfo.cleanup()
-## self.dominfo.destroy_console()
-## return val
-
-## d = XfrdInfo.xfr_vm_suspend(self, xfrd, val)
-## d.addCallback(cbok)
-## return d
-
def xfr_migrate_ok(self, xfrd, val):
dom = int(sxp.child0(val))
self.state = 'ok'
self.deferred.callback(self)
def connectionLost(self, reason=None):
- print 'XfrdMigrateInfo>connectionLost>', reason
XfrdInfo.connectionLost(self, reason)
if self.state =='ok':
log.info('Migrate OK: ' + str(self.sxpr()))
else:
self.state = 'error'
- self.error(XendError("migrate failed"))
+ self.error("migrate failed")
log.info('Migrate ERROR: ' + str(self.sxpr()))
eserver.inject('xend.domain.migrate',
- [ self.dominfo.name, self.dominfo.id,
- self.state, self.sxpr() ])
+ [ self.dominfo.name, self.dominfo.id, self.state, self.sxpr() ])
class XendSaveInfo(XfrdInfo):
"""Representation of a save in-progress and its interaction with xfrd.
return sxpr
def request(self, xfrd):
- print '***request>', self.vmconfig()
vmconfig = self.vmconfig()
if not vmconfig:
+ self.error(XendError("vm config not found"))
xfrd.loseConnection()
return
- print '***request> begin'
log.info('Save BEGIN: ' + str(self.sxpr()))
eserver.inject('xend.domain.save',
- [self.dominfo.name, self.dominfo.id,
- "begin", self.sxpr()])
+ [ self.dominfo.name, self.dominfo.id,
+ "begin", self.sxpr() ])
xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ])
def xfr_save_ok(self, xfrd, val):
self.deferred.callback(self)
def connectionLost(self, reason=None):
- print 'XfrdSaveInfo>connectionLost>', reason
XfrdInfo.connectionLost(self, reason)
if self.state =='ok':
log.info('Save OK: ' + str(self.sxpr()))
else:
self.state = 'error'
- self.error(XendError("save failed"))
+ self.error("save failed")
log.info('Save ERROR: ' + str(self.sxpr()))
eserver.inject('xend.domain.save',
[ self.dominfo.name, self.dominfo.id,
return sxpr
def request(self, xfrd):
- print '***request>', self.file
log.info('restore BEGIN: ' + str(self.sxpr()))
+ eserver.inject('xend.restore', [ 'begin', self.sxpr()])
+
xfrd.request(['xfr.restore', self.file ])
def xfr_restore_ok(self, xfrd, val):
self.state = 'ok'
if not self.deferred.called:
self.deferred.callback(dominfo)
-
+ def connectionLost(self, reason=None):
+ XfrdInfo.connectionLost(self, reason)
+ if self.state =='ok':
+ log.info('Restore OK: ' + self.file)
+ else:
+ self.state = 'error'
+ self.error("restore failed")
+ log.info('Restore ERROR: ' + str(self.sxpr()))
+ eserver.inject('xend.restore', [ self.state, self.sxpr()])
+
class XendMigrate:
"""External api for interaction with xfrd for migrate and save.
Singleton.
self.db.saveall("", self.session_db)
def sync_session(self, xid):
- print 'sync_session>', type(xid), xid, self.session_db[xid]
self.db.save(xid, self.session_db[xid])
def close(self):
self.sync_session(xid)
def _delete_session(self, xid):
- print '***_delete_session>', xid
if xid in self.session:
del self.session[xid]
if xid in self.session_db:
@param info: session
@return: deferred
"""
- def cbremove(val):
- print '***cbremove>', val
+ dfr = defer.Deferred()
+ def cbok(val):
self._delete_session(info.xid)
+ if not dfr.called:
+ dfr.callback(val)
return val
+ def cberr(err):
+ self._delete_session(info.xid)
+ if not dfr.called:
+ dfr.errback(err)
+ return err
self._add_session(info)
- info.deferred.addCallback(cbremove)
- info.deferred.addErrback(cbremove)
+ info.deferred.addCallback(cbok)
+ info.deferred.addErrback(cberr)
xcf = XfrdClientFactory(info)
reactor.connectTCP('localhost', XFRD_PORT, xcf)
- return info.deferred
+ return dfr
def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0, resource=0):
"""Begin to migrate a domain to another host.